import time
import os
import re
import errno
import inspect
import datetime
import uuid
import traceback
import threading
import threadpool

from daemon import Daemon
from rpyc_rpc import RpycRpc
from remotecopy_log import Log
from utils import Exp, _dmsg, _dwarn, _derror, \
    etchosts_update, cpu_usage, _exec_shell, _exec_pipe, \
    mutil_exec, _get_value, _human_unreadable, _human_readable, \
    _exec_pipe1, _exec_remote1, _lock_file, _exec_system, _get_remote, \
    _get_backtrace

class RemoteCopySrv(Daemon):
    def __init__(self, config, conf, foreground=False, replace=False):
        self.log = Log(os.path.join(config.log_path, 'remotecopy_client.log'), foreground)
        self.config = config
        self.gloconf = conf['global']
        self.conf = conf['client']
        self.foreground = foreground
        self.replace = replace
        self.arg_lock = threading.Lock()
        self.cached = 0
        self.cache_lock = threading.Lock()

        pidfile = '/var/run/fusionstack_remotecopy_client.lock'
        super(RemoteCopySrv, self).__init__(pidfile, name='remotecopy_client')

    def __exec_check(self):
        if 'stor' in self.gloconf:
            if self.gloconf['stor']['host'].startswith('127.'):
                return False
            else:
                return True
        else:
            return False

    def __exec_cmd(self, cmd):
        if self.__exec_check():
            user = self.gloconf['stor']['user']
            password = self.gloconf['stor']['pass']
            host = self.gloconf['stor']['host']
            return _exec_remote1(host, cmd, user, password)
        else:
            return _exec_pipe1(cmd.split(), 0, True, 300)

    def __exec_get(self, cmd, remote, local):
        (out_msg, err_msg) = self.__exec_cmd(cmd)

        user = self.gloconf['stor']['user']
        password = self.gloconf['stor']['pass']
        host = self.gloconf['stor']['host']
        _get_remote(host, remote, local, user, password)

        cmd = "rm -rf %s" % remote
        (out_msg, err_msg) = self.__exec_cmd(cmd)

    def __snapshot_get_list(self, vol, srv):
        list = []
        vol_name = self.conf[vol]['volume'][0][0]
        cmd = "lich.snapshot --list %s" % vol_name
        (out_msg, err_msg) = self.__exec_cmd(cmd)
        if out_msg:
            return [x for x in out_msg.split() if x.startswith("%s.%s-%s-"%(self.config.prefix, vol, srv))]

        return []

    def __snapshot_create(self, vol, srv):
        vol_name = self.conf[vol]['volume'][0][0]
        snap_name = "%s.%s-%s-%d" %(self.config.prefix, vol, srv, time.time())
        self.log._dinfo("create snapshot:%s@%s"%(vol_name, snap_name))
        cmd = "lich.snapshot --create %s@%s --force -p 1" %(vol_name, snap_name)
        (out_msg, err_msg) = self.__exec_cmd(cmd)
        return snap_name

    def __snapshot_remove(self, vol, srv, snap_name):
        vol_name = self.conf[vol]['volume'][0][0]
        self.log._dinfo("remove snapshot:%s@%s"%(vol_name, snap_name))
        cmd = "lich.snapshot --remove %s@%s --force" %(vol_name, snap_name)
        (out_msg, err_msg) = self.__exec_cmd(cmd)

    def __snapshot_chunk_read(self, vol, srv, slist, i, l):
        vol_name = self.conf[vol]['volume'][0][0]
        snap = slist[0]
        chunk = os.path.join(self.config.shm, "%s/%s/%s"%(vol, srv, i))
        if not os.path.exists(os.path.dirname(chunk)):
            os.makedirs(os.path.dirname(chunk))

        if self.__exec_check():
            newuuid = str(uuid.uuid1()).replace('-', '')
            tmpfile = '/tmp/temp.%s' % newuuid
            try:
                cmd = "lich.snapshot --copy %s@%s %s~%s :%s" % (vol_name, snap, str(i), str(l), tmpfile)
                self.__exec_get(cmd, tmpfile, chunk)
            except:
                cmd = "rm -rf %s" % tmpfile
                (out_msg, err_msg) = self.__exec_cmd(cmd)
                raise
        else:
            cmd = "lich.snapshot --copy %s@%s %s~%s :%s" % (vol_name, snap, str(i), str(l), chunk)
            (out_msg, err_msg) = self.__exec_cmd(cmd)

        fd = open(chunk, 'rb')
        context = fd.read()
        fd.close()

        return context

    def __snapshot_chunk_inc(self, vol, srv, slist, i, l):
        vol_name = self.conf[vol]['volume'][0][0]
        chunk = os.path.join(self.config.shm, "%s/%s/%s"%(vol, srv, i))
        if not os.path.exists(os.path.dirname(chunk)):
            os.makedirs(os.path.dirname(chunk))

        if self.__exec_check():
            newuuid = str(uuid.uuid1()).replace('-', '')
            tmpfile = '/tmp/temp.%s' % newuuid
            try:
                cmd = "lich.snapshot --diff %s@%s~%s %s~%s :%s" % (vol_name,
                        slist[0], slist[-1], str(i), str(l), tmpfile)
                self.__exec_get(cmd, tmpfile, chunk)
            except Exp, e:
                cmd = "rm -rf %s" % tmpfile
                (out_msg, err_msg) = self.__exec_cmd(cmd)
                raise
        else:
            try:
                cmd = "lich.snapshot --diff %s@%s~%s %s~%s :%s" % (vol_name,
                        slist[0], slist[-1], str(i), str(l), chunk)
                (out_msg, err_msg) = self.__exec_cmd(cmd)
            except Exp, e:
                raise

        fd = open(chunk, 'rb')
        context = fd.read()
        fd.close()

        return context

    def __file_write(self, vol, srv, file, context):
        stat_path = os.path.join(self.config.stat_path, "%s/%s" % (vol, srv))
        if not os.path.exists(stat_path):
            os.makedirs(stat_path)

        fd = open("%s/%s" % (stat_path, file), 'wb')
        fd.write(str(context))
        fd.close()

    def __file_read(self, vol, server, file):
        stat_path = os.path.join(self.config.stat_path, "%s/%s"% (vol, server))
        if not os.path.exists(stat_path):
            os.makedirs(stat_path)

        try:
            fd = open("%s/%s" % (stat_path, file), 'rb')
            context = fd.read()
            fd.close()

            return context
        except:
            return None

    def __chunk_remove(self, vol, srv, i):
        chunk = os.path.join(self.config.shm, "%s/%s/%s"%(vol, srv, i))
        os.unlink(chunk)

    def __chunk_sync(self, vol, srv, rpc, i, l, neg):
        if neg['sync'] == 'all':
            chunk = self.__snapshot_chunk_read(vol, srv, neg['slist'], i, l)
        else:
            chunk = self.__snapshot_chunk_inc(vol, srv, neg['slist'], i, l)

        rpc.chunk_sync(i, l, chunk)
        if chunk:
            self.__chunk_remove(vol, srv, i)

    def remotecopy_chunk(self, arg):
        try:
            arg['work'] = True
            vol = arg['vol']
            srv = arg['srv']
            neg = arg['neg']
            rpc = arg['rpc']

            for i in range(arg['start'], arg['start'] + arg['num'], arg['qos']):
                l = arg['qos'] if (arg['qos'] + i) <= (arg['start'] + arg['num']) else arg['start'] + arg['num'] - i
                need = rpc.chunk_negotiation(i, l)

                if need:
                    while True:
                        if self.cached >= self.gloconf['cache']:
                            if arg['exit'] == True:
                                self.arg_lock.acquire()
                                self.threadarg.remove(arg)
                                self.arg_lock.release()
                                return
                            time.sleep(1)
                        else:
                            break

                    self.cache_lock.acquire()
                    self.cached += l
                    self.cache_lock.release()

                    self.__chunk_sync(vol, srv, rpc, i, l, neg)

                    self.cache_lock.acquire()
                    self.cached -= l
                    self.cache_lock.release()

                if arg['exit'] == True:
                    self.arg_lock.acquire()
                    self.threadarg.remove(arg)
                    self.arg_lock.release()
                    return
        except Exception, e:
            self.__volume_set_error(vol, srv, e)
            msg = traceback.format_exc()
            self.log._derror(msg)

        self.arg_lock.acquire()
        self.threadarg.remove(arg)
        self.arg_lock.release()
        return

    def __volume_exists(self, volume):
        try:
            cmd = "lich.inspect --stat %s" % volume
            (out_msg, err_msg) = self.__exec_cmd(cmd)
            return True
        except:
            return False

    def __volume_set_chknum(self, vol, srv):
        vol_name = self.conf[vol]['volume'][0][0]
        chknum = None
        chknum_local = self.__file_read(vol, srv, 'chknum')
        cmd = "lich.inspect --stat %s" % vol_name
        (out_msg, err_msg) = self.__exec_cmd(cmd)
        for i in out_msg.split('\n'):
            if i.startswith('chknum'):
                chknum = i.split()[-1]

        if chknum:
            if chknum_local is None:
                #self.log._dinfo("set %s %s chknum to '%s'" % (vol, srv, chknum))
                self.__file_write(vol, srv, 'chknum', chknum)
            elif chknum != chknum_local:
                #self.log._dinfo("set %s %s chknum to '%s'" % (vol, srv, chknum))
                self.__file_write(vol, srv, 'chknum', chknum)

            return int(chknum)

        return None

    def __volume_get_chknum(self, vol, srv):
        chknum = self.__file_read(vol, srv, 'chknum')
        if chknum is None:
            return self.__volume_set_chknum(vol, srv)

        return int(chknum) if chknum else None

    def __volume_set_curr(self, vol, srv, curr):
        return self.__file_write(vol, srv, 'curr', curr)

    def __volume_get_curr(self, vol, srv):
        if self.__file_read(vol, srv, 'curr') == None:
            return {'curr':'unknow', 'thread':0, 'left':0}
        return eval(self.__file_read(vol, srv, 'curr'))

    def __volume_set_lname(self, vol, srv, vol_name):
        return self.__file_write(vol, srv, 'volume', vol_name)

    def __volume_get_lname(self, vol, srv):
        return self.__file_read(vol, srv, 'volume')

    def __volume_set_rname(self, vol, srv, vol_name):
        return self.__file_write(vol, srv, 'volume_remote', vol_name)

    def __volume_get_rname(self, vol, srv):
        return self.__file_read(vol, srv, 'volume_remote')

    def __volume_set_status(self, vol, srv, status):
        return self.__file_write(vol, srv, 'status', status)

    def __volume_get_status(self, vol, srv):
        return self.__file_read(vol, srv, 'status')

    def __volume_set_error(self, vol, srv, error):
        return self.__file_write(vol, srv, 'error', error)

    def __volume_get_error(self, vol, srv):
        return self.__file_read(vol, srv, 'error')

    def __volume_set_last(self, vol, srv):
        return self.__file_write(vol, srv, 'last', int(time.time()))

    def __volume_get_slist(self, vol, srv):
        slist = self.__file_read(vol, srv, 'slist')
        return eval(slist) if slist is not None else []

    def __volume_set_slist(self, vol, srv, slist):
        return self.__file_write(vol, srv, 'slist', slist)

    def __volume_snapshot_prep(self, vol, srv, neg):
        slist_cmd =  self.__snapshot_get_list(vol, srv)
        slist_local = self.__volume_get_slist(vol, srv)
        if neg['sync'] == "all":
            if len(slist_cmd) == 0:
                snap_name = self.__snapshot_create(vol, srv)
                slist_cmd.append(snap_name)
                self.__volume_set_slist(vol, srv, slist_cmd)
            elif len(slist_cmd) == 1:
                if slist_cmd[-1] != slist_local[-1]:
                    raise Exp(errno.EPERM, "snapshot get by cmd is %s but local record snapshot is %s"
                            %(slist_cmd, slist_local))
            elif len(slist_cmd) == 2:
                self.__snapshot_remove(vol, srv, slist_cmd[0])
                slist_cmd.remove(slist_cmd[0])
                self.__volume_set_slist(vol, srv, slist_cmd)
            else:
                raise Exp(errno.EPERM, "snapshot get by cmd is %s but sync type is %s"
                        %(slist_cmd, neg['sync']))
        elif neg['sync'] == 'inc':
            if len(slist_cmd) == 1:
                if neg['slast'] != slist_cmd[0]:
                    raise Exp(errno.EPERM, "remote last sync snapshot is %s but snapshot get by cmd is %s"
                            %(neg['slast'], slist_cmd))
                snap_name = self.__snapshot_create(vol, srv)
                slist_cmd.append(snap_name)
                self.__volume_set_slist(vol, srv, slist_cmd)
            elif len(slist_cmd) == 2:
                if neg['slast'] not in slist_cmd:
                    raise Exp(errno.EPERM, "remote last sync snapshot is %s but snapshot get by cmd is %s"
                            %(neg['slast'], slist_cmd))
                elif neg['slast'] == slist_cmd[-1]:
                    self.__snapshot_remove(vol, srv, slist_cmd[0])
                    slist_cmd.remove(slist_cmd[0])
                    snap_name = self.__snapshot_create(vol, srv)
                    slist_cmd.append(snap_name)
                    self.__volume_set_slist(vol, srv, slist_cmd)
                elif neg['slast'] == slist_cmd[0]:
                    if slist_cmd != slist_local:
                        raise Exp(errno.EPERM, "snapshot get by cmd is %s but local record snapshot is %s"
                                %(slist_cmd, slist_local))
            else:
                raise Exp(errno.EPERM, "snapshot get by cmd is %s but sync type is %s"
                        %(slist_cmd, neg['sync']))

        return slist_cmd

    def __volume_snapshot_finish(self, vol, srv, neg):
        if neg['sync'] == "all":
            pass
        else:
            self.__snapshot_remove(vol, srv, neg['slist'][0])
            self.__volume_set_slist(vol, srv, neg['slist'][1:])

    def __volume_init(self, vol, srv, rpc):
        self.__volume_set_chknum(vol, srv);
        chknum = self.__volume_get_chknum(vol, srv)
        if chknum == 0:
            return False

        shm = os.path.join(self.config.shm, "%s/%s"%(vol, srv))
        if not os.path.exists(shm):
            os.makedirs(shm)

        vol_lname = self.conf[vol]['volume'][0][0]
        vol_rname = self.conf[vol]['volume'][0][1]
        reset = False

        status = self.__volume_get_status(vol, srv)
        if status is None:
            self.log._dinfo("set %s %s status to '%s'" % (vol, srv, 'begin'))
            self.__volume_set_status(vol, srv, 'begin')
            reset = True

        vol_local = self.__volume_get_lname(vol, srv)
        if vol_local is None:
            self.log._dinfo("set %s %s lname to '%s'" % (vol, srv, vol_lname))
            self.__volume_set_lname(vol, srv, vol_lname)
        elif vol_local != vol_lname:
            self.log._dinfo("volume %s %s lname '%s' change to '%s'" % (vol, srv, vol_local, vol_lname))
            self.__volume_set_lname(vol, srv, vol_lname)
            reset = True

        vol_remote = self.__volume_get_rname(vol, srv)
        if vol_remote is None:
            self.log._dinfo("set %s %s rname to '%s'" % (vol, srv, vol_rname))
            self.__volume_set_rname(vol, srv, vol_rname)
        elif vol_remote != vol_rname:
            self.log._dinfo("volume %s %s rname '%s' change to '%s'" % (vol, srv, vol_remote, vol_rname))
            self.__volume_set_rname(vol, srv, vol_rname)
            reset = True

        if reset:
            rpc.volume_reset()

        return True

    def __volume_next(self, vol, srv):
        last = self.__file_read(vol, srv, 'last')
        if last is None:
            return 1
        else:
            nxt = self.gloconf['interval'] - (int(time.time()) - int(last))
            return nxt if nxt > 1 else 1

    def __volume_next_real(self, vol, srv):
        last = self.__file_read(vol, srv, 'last')
        if last is None:
            return 0
        else:
            nxt = self.gloconf['interval'] - (int(time.time()) - int(last))
            return nxt if nxt > 0 else 0

    def __volume_prev(self, vol, srv):
        last = self.__file_read(vol, srv, 'last')
        if last is None:
            return 0
        else:
            return int(time.time()) - int(last)

    def __get_weight(self, vol, srv):
        weight_sum = 0
        chknum = self.__volume_get_chknum(vol, srv)
        for i in self.conf:
            for s in self.conf[i]['server']:
                #if self.__threadpool_work(i, s[0]):
                #    continue
                chk = self.__volume_get_chknum(i, s[0])
                nxt = self.__volume_next(i, s[0])
                weight_sum += int(chk)/nxt

        weight = int((self.gloconf['thread'] - len(self.conf)) * int(chknum) / weight_sum)
        return weight if weight > 1 else 1

    def __volume_begin(self, vol, srv, rpc):
        self.__volume_set_error(vol, srv, "")
        self.__volume_set_curr(vol, srv, {'curr':'begin', 'thread':0, 'left':0})

        chknum = self.__volume_get_chknum(vol, srv)
        neg = rpc.volume_negotiation(chknum)

        weight = self.__get_weight(vol, srv)
        slist = self.__volume_snapshot_prep(vol, srv, neg)

        neg['slist'] = slist
        neg['chknum'] = chknum

        arglist = []
        avg = chknum / weight if chknum % weight == 0 else chknum / weight + 1

        avg = avg if avg % self.gloconf['qos'] == 0 else (avg / self.gloconf['qos'] + 1) * self.gloconf['qos']

        idx = 0
        offset = 0
        left = chknum
        while left > 0:
            num = avg if avg < left else left
            arg = {'type':'chunk', 'work':False, 'exit':False,
                    'vol':vol, 'srv':srv, 'rpc':rpc, 'neg':neg,
                    'start':offset, 'num':num, 'qos':self.gloconf['qos'], 'idx':idx}
            arglist.append(arg)

            idx += 1
            offset += num
            left -= num

        self.__volume_set_curr(vol, srv, {'curr':'begin', 'thread':len(arglist), 'left':chknum})
        self.log._dinfo("volume %s %s start with %s threads neg:%s" %(vol, srv, len(arglist), neg))
        self.arg_lock.acquire()
        self.threadarg.extend(arglist)
        self.arg_lock.release()

        self.__threadpool_request(self.remotecopy_chunk, arglist)
        return neg

    def __volume_wait(self, vol, srv, rpc, neg):
        while self.__threadpool_work(vol, srv):
            thread = 0
            left = 0
            exit = False

            self.arg_lock.acquire()
            for arg in self.threadarg:
                if arg['vol'] == vol and arg['srv'] == srv:
                    if arg['exit']:
                        exit = True
                    thread += 1
            self.arg_lock.release()

            left = rpc.volume_left(neg)
            self.__volume_set_curr(vol, srv, {'curr': neg['sync'] if not exit else 'exit',
                'thread':thread, 'left':left})
            time.sleep(1)

    def __volume_finish(self, vol, srv, rpc, neg, arg):
        rpc.volume_finish(neg)
        self.__volume_snapshot_finish(vol, srv, neg)
        self.__volume_set_status(vol, srv, 'finish')
        self.__volume_set_last(vol, srv)
        self.log._dinfo("volume %s %s set status to '%s'" % (vol, srv, 'finish'))
        self.__volume_set_curr(vol, srv, {'curr':'sleep', 'thread':0, 'left':0})

    def remotecopy_volume(self, arg):
        try:
            arg['work'] = True
            vol = arg['vol']
            srv = arg['srv']
            rpc = arg['rpc']

            rpc.connect()

            need = self.__volume_init(vol, srv, rpc)
            if need:
                neg = self.__volume_begin(vol, srv, rpc)

                self.__volume_wait(vol, srv, rpc, neg)
                self.__volume_finish(vol, srv, rpc, neg, arg)

            rpc.close()

        except Exception, e:
            if str(type(e)) == "<class 'rpyc.core.vinegar/utils.utils.Exp'>":
                err = str(e).splitlines()[-1]
                if 'error:' in err:
                    err = err.split('error:')[1].split('stdout:')[0].strip().replace('\\n', '')
                else:
                    err = err.split('err:')[1].split('stdout:')[0].strip().replace('\\n', '')
                errno = str(e).splitlines()[-1].split('errno:')[1].split('err:')[0].strip().replace(',', '')
                self.__volume_set_error(vol, srv, "[Errno %s] %s" %(errno, err))
            else:
                self.__volume_set_error(vol, srv, e)

            msg = traceback.format_exc()
            self.log._derror(msg)

            time.sleep(10)

        self.arg_lock.acquire()
        self.threadarg.remove(arg)
        self.arg_lock.release()
        return

    def __threadpool_init(self):
        self.threadpool = threadpool.ThreadPool(self.gloconf['thread'])
        self.threadarg = []

    def __threadpool_request(self, func, arg):
        requests = threadpool.makeRequests(func, arg)
        [self.threadpool.putRequest(req) for req in requests]

    def __threadpool_exit(self):
        self.cache_lock.acquire()
        self.cached = 0
        self.cache_lock.release()

        self.arg_lock.acquire()
        for arg in self.threadarg:
            arg['exit'] = True
        self.arg_lock.release()

        self.threadpool.wait()

    def __threadpool_exists(self, vol, srv):
        exists = False
        self.arg_lock.acquire()
        for arg in self.threadarg:
            if arg['vol'] == vol and arg['work'] == True and arg['srv'] == srv:
                exists = True
        self.arg_lock.release()

        return exists

    def __threadpool_volcount(self):
        volnum = 0
        self.arg_lock.acquire()
        for arg in self.threadarg:
            if arg['type'] == 'volume':
                volnum += 1
        self.arg_lock.release()

        return volnum

    def __threadpool_work(self, vol, srv):
        work = False
        self.arg_lock.acquire()
        for arg in self.threadarg:
            if arg['vol'] == vol and arg['type'] == 'chunk' and arg['srv'] == srv:
                work = True
        self.arg_lock.release()

        return work

    def run(self):
        self.log._dinfo("%s started"%self.name)

        _exec_system("rm -rf %s/*" % self.config.shm)
        self.__threadpool_init()

        while True:
            conf = self.config.get_conf('client')
            if 'global' not in conf:
                self.__threadpool_exit()
                return
            if conf['global']['thread'] != self.gloconf['thread']:
                self.__threadpool_exit()
                self.__threadpool_init()
                _exec_system("rm -rf %s/*" % self.config.shm)

            if conf['global'] != self.gloconf:
                self.gloconf = conf['global']
            elif  conf['client'] != self.conf:
                self.conf = conf['client']

            b = False
            for vol in self.conf:
                for s in self.conf[vol]['server']:
                    status = self.__volume_get_status(vol, s[0])
                    if self.__threadpool_exists(vol, s[0]):
                        continue
                    elif status == 'finish':
                        prev = self.__volume_prev(vol, s[0])
                        if prev >= 0 and prev < self.conf[vol]['interval']:
                           continue

                    vol_count =  self.__threadpool_volcount()
                    if self.gloconf['thread'] / 2 < vol_count:
                        b = True

                    self.arg_lock.acquire()
                    if self.gloconf['thread'] - len(self.threadarg) < 2:
                        b = True
                    self.arg_lock.release()

                    if b:
                        break

                    arg = {'type':'volume', 'work':False, 'exit':False, 'vol':vol, 'srv':s[0],
                            'rpc':RpycRpc(self.gloconf['uuid'], vol, self.conf[vol], s[0], s[1])}

                    self.arg_lock.acquire()
                    self.threadarg.append(arg)
                    self.arg_lock.release()

                    self.__threadpool_request(self.remotecopy_volume, [arg])
                if b:
                    break

            time.sleep(1)

    def stat(self, foreground):
        if not super(RemoteCopySrv, self).stat() and not foreground:
            raise Exp(errno.EPERM, "process not running")

        for vol in self.conf:
            print 'name: %s'% vol
            print 'vol: %s'% (self.conf[vol]['volume'][0][0])
            if len(self.conf[vol]['server']) == 0:
                print 'server not found'
            for s in self.conf[vol]['server']:
                srv = s[0]
                error = self.__volume_get_error(vol, srv)
                chknum = self.__volume_get_chknum(vol, srv)
                curr = self.__volume_get_curr(vol, srv)

                print '    server: %s' % srv
                print '    error: %s' % error
                print '    status: %s' % curr['curr']
                print '    thread: %s/%s' % (curr['thread'], self.gloconf['thread'])
                print '    chunk: %s/%s' % (curr['left'], chknum)
                print '    interval: %s/%s' % (self.__volume_next_real(vol, srv), self.conf[vol]['interval'])
                print ''

    def stop(self):
        self.log._dinfo("%s stopped"%self.name)
        super(RemoteCopySrv, self).stop()
